package com.bawei.persona.realtime.app.dwm;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;


import com.bawei.persona.realtime.app.func.OrderWideDimAsyncFunction;
import com.bawei.persona.realtime.bean.OrderDetail;
import com.bawei.persona.realtime.bean.OrderInfo;
import com.bawei.persona.realtime.bean.OrderProtalBean;
import com.bawei.persona.realtime.bean.OrderWide;
import com.bawei.persona.realtime.common.GmallConfig;
import com.bawei.persona.realtime.util.ClickHouseUtil;
import com.bawei.persona.realtime.util.MyKafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

/**
 * 项目规划及管理
 * 上海大数据学院院长 ：孙丰朝
 * 技术指导及需求分析：郭洵
 * 编程：楚志高
 *
 * @author bawei  bigdata sh
 * @since 2021-06-11


 * 业务执行流程
 * 1.模拟生成数据jar
 * 2.在MySQL数据库的表中插入相关的业务数据
 * 3.MySQL的Binlog记录变化的数据
 * 4.Maxwell会将变化的数据采集到，并且发送到Kafka的ods_base_db_m
 * 5.BaseDBApp从ods_base_db_m主题中读取数据，进行分流操作
 * -事实数据  发送到kafka的dwd层
 * dwd_order_info
 * dwd_order_detail
 * <p>
 * -维度数据	发送到Hbase的表中
 * DIM_USER_INFO
 * 6.OrderWideApp从kafka的dwd层和Hbase的维度表中读取数据
 * -双流join
 * 使用的是intervalJoin  对OrderInfo和OrderDetail进行join
 * -维度关联
 * 旁路缓存
 * -异步查询
 * 自定义函数类DimAsyncFunction 继承 RichAsyncFunction类
 * 从写类中的asyncInvoke，在该方法中，从线程池中获取新的线程，并执行维度关联操作
 * 模板方法设计模式：在父类中定义业务执行的模板，实现延迟到子类中完成
 * 在主程序OrderWideApp中调用函数
 * AsyncDataStream.unorderedWait(
 * stream,
 * new DimAsyncFunction(维度表名){
 * 重写getKey
 * 重写join
 * },
 * 1000,
 * TimeUnit.MILLISECONDS,
 * 100
 * );
 */
public class OrderWideAppNew {
    public static void main(String[] args) throws Exception {
        //TODO 1.基本环境准备
        //1.1  准备本地测试流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //1.2 设置并行度
        env.setParallelism(4);

        //1.3 设置Checkpoint
        //env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        //env.getCheckpointConfig().setCheckpointTimeout(60000);
        //env.setStateBackend(new FsStateBackend("hdfs://hadoop202:8020/gmall/checkpoint/uniquevisit"))

        //TODO 2.从Kafka的DWD层读取订单和订单明细数据
        //2.1 声明相关的主题以及消费者组
        String orderInfoSourceTopic = "dwd_order_info";
        String orderDetailSourceTopic = "dwd_order_detail";
        String orderWideSinkTopic = "dwm_order_wide";
        String groupId = "order_wide_group";

        //2.2 读取订单主题数据
        FlinkKafkaConsumer<String> orderInfoSource = MyKafkaUtil.getKafkaSource(orderInfoSourceTopic, groupId);
        DataStreamSource<String> orderInfoJsonStrDS = env.addSource(orderInfoSource);


        //2.3 读取订单明细数据
        FlinkKafkaConsumer<String> orderDetailSource = MyKafkaUtil.getKafkaSource(orderDetailSourceTopic, groupId);
        DataStreamSource<String> orderDetailJsonStrDS = env.addSource(orderDetailSource);

        //TODO 3.对读取的数据进行结构的转换      jsonString -->OrderInfo|OrderDetail
        //3.1 转换订单数据结构
        SingleOutputStreamOperator<OrderInfo> orderInfoDS = orderInfoJsonStrDS.map(
            new RichMapFunction<String, OrderInfo>() {
                SimpleDateFormat sdf = null;

                @Override
                public void open(Configuration parameters) throws Exception {
                    sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                }

                @Override
                public OrderInfo map(String jsonStr) throws Exception {
                    OrderInfo orderInfo = JSON.parseObject(jsonStr, OrderInfo.class);
                    orderInfo.setCreate_ts(sdf.parse(orderInfo.getCreate_time()).getTime());
                    return orderInfo;
                }
            }
        );

        //3.2 转换订单明细数据结构
        SingleOutputStreamOperator<OrderDetail> orderDetailDS = orderDetailJsonStrDS.map(
            new RichMapFunction<String, OrderDetail>() {
                SimpleDateFormat sdf = null;

                @Override
                public void open(Configuration parameters) throws Exception {
                    sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                }

                @Override
                public OrderDetail map(String jsonStr) throws Exception {
                    OrderDetail orderDetail = JSON.parseObject(jsonStr, OrderDetail.class);
                    orderDetail.setCreate_ts(sdf.parse(orderDetail.getCreate_time()).getTime());
                    return orderDetail;
                }
            }
        );

      orderInfoDS.print("orderInfo>>>");
       orderDetailDS.print("orderDetail>>>");

        //TODO 4. 指定事件时间字段
        //4.1 订单指定事件时间字段
        SingleOutputStreamOperator<OrderInfo> orderInfoWithTsDS = orderInfoDS.assignTimestampsAndWatermarks(
            WatermarkStrategy
                .<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(new SerializableTimestampAssigner<OrderInfo>() {
                    @Override
                    public long extractTimestamp(OrderInfo orderInfo, long recordTimestamp) {
                        return orderInfo.getCreate_ts();
                    }
                })
        );
        //4.2 订单明细指定事件时间字段
        SingleOutputStreamOperator<OrderDetail> orderDetailWithTsDS = orderDetailDS.assignTimestampsAndWatermarks(
            WatermarkStrategy.<OrderDetail>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(
                    new SerializableTimestampAssigner<OrderDetail>() {
                        @Override
                        public long extractTimestamp(OrderDetail orderDetail, long recordTimestamp) {
                            return orderDetail.getCreate_ts();
                        }
                    }
                )
        );


        //TODO 5.按照订单id进行分组  指定关联的key
        KeyedStream<OrderInfo, Long> orderInfoKeyedDS = orderInfoWithTsDS.keyBy(OrderInfo::getId);
        KeyedStream<OrderDetail, Long> orderDetailKeyedDS = orderDetailWithTsDS.keyBy(OrderDetail::getOrder_id);

        //TODO 6.使用intervalJoin对订单和订单明细进行关联
        SingleOutputStreamOperator<JSONObject> orderWideDS = orderInfoKeyedDS
                .intervalJoin(orderDetailKeyedDS)
                .between(Time.milliseconds(-5), Time.milliseconds(5))
                .process(
                        new ProcessJoinFunction<OrderInfo, OrderDetail, JSONObject>() {
                            @Override
                            public void processElement(OrderInfo orderInfo, OrderDetail orderDetail, Context ctx, Collector<JSONObject> out) throws Exception {

                                OrderWide orderWide = new OrderWide(orderInfo, orderDetail);
                                //将合并后的对象进行再一次的转化为json 对象为后面的方便合并做准备
                                String jsonString = JSON.toJSONString(orderWide);
                                JSONObject jsonObject = JSON.parseObject(jsonString);
                                out.collect(jsonObject);
                            }
                        }
                );

        orderWideDS.print("orderWide>>>>");



        //TODO 7.关联用户维度

        SingleOutputStreamOperator<JSONObject> orderWideWithUserDS = AsyncDataStream.unorderedWait(orderWideDS,

                new OrderWideDimAsyncFunction(GmallConfig.DIM_USER_INFO),
                60, TimeUnit.SECONDS);

       orderWideWithUserDS.print(">>>>>");

//        //TODO 8.关联省市维度
        SingleOutputStreamOperator<JSONObject> orderWideWithProvice = AsyncDataStream.unorderedWait(
                orderWideWithUserDS,
                new OrderWideDimAsyncFunction(GmallConfig.DIM_PROVICE),
                60, TimeUnit.SECONDS
        );
//        orderWideWithProvice.print(">>>>>");
//
//        //orderWideWithProvinceDS.print(">>>>>");
//
//        //TODO 8.关联SKU维度
        SingleOutputStreamOperator<JSONObject> orderWideWitSku = AsyncDataStream.unorderedWait(
                orderWideWithProvice,
                new OrderWideDimAsyncFunction(GmallConfig.DIM_SKU_INFO),
                60, TimeUnit.SECONDS
        );
//        orderWideWitSku.print(">>>>>");
//      //TODO 9.关联SPU商品维度
        SingleOutputStreamOperator<JSONObject> orderWideWitSpu = AsyncDataStream.unorderedWait(
                orderWideWitSku,
                new OrderWideDimAsyncFunction(GmallConfig.DIM_SPU_INFO),
                60, TimeUnit.SECONDS
        );
//        orderWideWitSpu.print(">>>>>");
//
//        //orderWideWithSpuDS.print(">>>>>");
//
//        //TODO 10.关联品类维度
        SingleOutputStreamOperator<JSONObject> orderWideWithCatege3 = AsyncDataStream.unorderedWait(
                orderWideWitSpu,
                new OrderWideDimAsyncFunction(GmallConfig.DIM_BASE_CATEGORY3),
                60, TimeUnit.SECONDS
        );


        //orderWideWithCategory3DS.print(">>>>>");

//        //TODO 10.关联品牌维度
        SingleOutputStreamOperator<JSONObject> orderWideWithTrademark = AsyncDataStream.unorderedWait(
                orderWideWithCatege3,
                new OrderWideDimAsyncFunction(GmallConfig.DIM_BASE_TRADEMARK),
                60, TimeUnit.SECONDS
        );

//
        orderWideWithTrademark.print(">>>>>");


        //TODO 11.将关联后的订单宽表数据写回到kafka的DWM层
        orderWideWithTrademark
            .map(
                orderWide->JSON.toJSONString(orderWide)
            )
            .addSink(MyKafkaUtil.getKafkaSink(orderWideSinkTopic));
        // 将数据插入一个clickhouse表中，将来对用户的画像按
        //统计类用户画像指标计算的时候的数据来源，
//        orderWideWithTrademark
//                .addSink( new RichClickHSOrderProtalFunction());
        //TODO 8.写入到ClickHouse
        //转化为实体bean 然后插入数据
        SingleOutputStreamOperator<OrderProtalBean> orderProtalStream = orderWideWithTrademark.map(new MapFunction<JSONObject, OrderProtalBean>() {
            @Override
            public OrderProtalBean map(JSONObject jsonObject) throws Exception {

                return jsonObject.toJavaObject(OrderProtalBean.class);
            }
        });
        orderProtalStream.addSink(
                ClickHouseUtil.getJdbcSink(" insert into protrait_order(category1_id,category2_id,category3_id,category1_name,category2_name, category3_name,carrier,carriername, user_gendertype,user_gender,province_id,province_name,tm_name,email, emailtype,user_age,yearbasetype,yearbasename,create_time,user_id,order_id,order_price,sku_num ,coupon_reduce_amount,total_amount,activity_reduce_amount,spu_name) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
        );

        env.execute();
    }
}
